package net.bwie.realtime.guanjuntao.util;

import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

/**
 * 将数据保存Doris表中，提供相关工具类
 * @author xuanyu
 */
public class DorisUtil {

    /**
     * 将DataStream数据流数据保存Doris表中，要求数据为JSON字符串
     * @param stream 数据流
     * @param database 数据库名称
     * @param table 表名称
     */
    public static void saveToDoris(DataStream<String> stream, String database, String table) {
        // 1. 设置Doris参数选项
        DorisOptions dorisOptions = DorisOptions.builder()
                .setFenodes("node102:8030")
                .setTableIdentifier(database + "." + table)
                .setUsername("root")
                .setPassword("123456")
                .build();
        // 2. 创建Doris Sink实例
        SinkFunction<String> dorisSink = DorisSink.<String>sink(dorisOptions);
        // 3. 保存数据
        stream.addSink(dorisSink) ;
    }

}